class fsXT(object):
def __init__(self):
self.epochs = dict()
self.len_epochs = 0
self.pwd = os.getcwd()
self.testscriptfile = '%s/batch'%self.pwd
def generate_data(self):
es = set()
for arg_p in LST_WORKERS:
for arg_l in LST_RECORDLEN:
for arg_n in LST_RECORDNUM:
arg_f = max(1, int(TOTAL_LOAD_PER_EPOCH / (arg_p*arg_l*arg_n)))
arg_n = max(1, int(TOTAL_LOAD_PER_EPOCH/(arg_p*arg_f*arg_l)))
cmdline = "%s/ttfs -w%s -r%s -p%d -l%d -n%d -f%d" % \
(self.pwd, WRITE_PATH, READ_PATH, arg_p, arg_l, arg_n, arg_f)
es.add(cmdline)
sf = open(self.testscriptfile,'w')
arg_e = 1
for i in es:
sf.write((i+' -e%d\n'%arg_e))
arg_e += 1
sf.close()
self.len_epochs = len(es)
print('The test script file was generated and save in %s'%self.testscriptfile)
def run_test(self):
cmdline = 'chmod +x %s;nohup %s &'%(self.testscriptfile, self.testscriptfile)
print(cmdline)
process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
process.communicate()
print('NOTICE: THIS IS A TIME CONSUMING JOB. YOU SHOULD WAIT UNTIL ALL EPOCHS FINISHED. PLEASE CHECK ITS STATUS ON HOST.')
def collect_data(self):
cmdline = \
"""
mkdir -p /tmp/fsXTlog;pushd /tmp/fsXTlog;rm files.log timeticks.log epochs.log -f;mv ../ttfslog.???? ./ -f;
for i in {1..%d};do cat ttfslog.`printf "%%04d" $i`|grep -E "^1,"|sed `printf 's/^[^,]*,/%%d,/' ${i}` >> files.log;done
for i in {1..%d};do cat ttfslog.`printf "%%04d" $i`|grep -E "^2,"|sed `printf 's/^[^,]*,/%%d,/' ${i}` >> timeticks.log;done
for i in {1..%d};do cat ttfslog.`printf "%%04d" $i`|grep -E "^3,"|cut -f1 -d"," --complement >> epochs.log;done
"""%(self.len_epochs, self.len_epochs, self.len_epochs)
process = subprocess.Popen(cmdline, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True)
process.communicate()
print('Finished collecting data from /tmp/ttfslog.*, and save data to /tmp/fsXTlog/files.log, timeticks.log and epochs.log ')
def wrangle_data(self):
self.df_epochs = pd.read_csv("/tmp/fsXTlog/epochs.log", header=None)
self.df_epochs.columns = ['epoch', 'workers', 'record_length', 'record_number', 'files', 'start_timestamp', 'end_timestamp']
self.df_files = pd.read_csv("/tmp/fsXTlog/files.log", header=None)
self.df_files.columns = ['epoch', 'worker_id', 'filename','record_length', 'record_number', 'md5_write', 'md5_read',
'tlen_open','tlen_close','tlen_mv','tlen_write', 'tlen_read']
self.df_timeticks = pd.read_csv("/tmp/fsXTlog/timeticks.log", header=None)
self.df_timeticks.columns = ['epoch', 'worker_id', 'checkpoint','unit_wbytes', 'unit_rbytes', 'elapsed_time']
self.df_timeticks['unit_wbytes'] = self.df_timeticks['unit_wbytes']/1024**2
self.df_timeticks['unit_rbytes'] = self.df_timeticks['unit_rbytes']/1024**2
self.df_epochs['total_files'] = self.df_epochs['files']*self.df_epochs['workers']
self.df_epochs['file_size'] = self.df_epochs['record_length']*self.df_epochs['record_number']
self.df_epochs['duration'] = self.df_epochs['end_timestamp'] - self.df_epochs['start_timestamp']
self.df_epochs['total_load'] = self.df_epochs['total_files']*self.df_epochs['file_size']
self.df_epochs['e_speed'] = self.df_epochs['total_load']/self.df_epochs['duration']/1024**2
self.df_epochs.index = self.df_epochs['epoch']
self.df_epochs.index.name = ""
self.df_files['tlen_file'] = self.df_files['tlen_open']+self.df_files['tlen_close'] \
+self.df_files['tlen_mv']+self.df_files['tlen_write']+self.df_files['tlen_read']
self.df_files.drop(columns = ['record_length', 'record_number'], inplace=True)
self.df_files = pd.merge(self.df_files, self.df_epochs, how='left', left_on=['epoch'], right_on = ['epoch'])
self.df_files['fw_speed'] = self.df_files['file_size']/self.df_files['tlen_write']/1024**2
self.df_files['fr_speed'] = self.df_files['file_size']/self.df_files['tlen_read']/1024**2
df_score = xt.df_files[['e_speed','fw_speed','fr_speed']].agg(['min','max'])
df_score = df_score.append(pd.Series(df_score.loc['max'] - df_score.loc['min'], name='range'))
df_score_rst = self.df_files[['epoch','e_speed','fw_speed', 'fr_speed']].groupby(['epoch']).mean()
df_score_rst['epoch_score'] = (df_score_rst['e_speed'] - df_score.loc['min','e_speed'])/df_score.loc['range','e_speed']*100.0
df_score_rst['fw_score'] = (df_score_rst['fw_speed'] - df_score.loc['min','fw_speed'])/df_score.loc['range','fw_speed']*100.0
df_score_rst['fr_score'] = (df_score_rst['fr_speed'] - df_score.loc['min','fr_speed'])/df_score.loc['range','fr_speed']*100.0
df_score_rst['score'] = df_score_rst['epoch_score']*0.7+df_score_rst['fw_score']*0.2+df_score_rst['fr_score']*0.1
#xt.df_files.sort_values(['fw_speed'], ascending=False)[xt.df_files.columns[10:]]
self.df_epochs = pd.merge(self.df_epochs, df_score_rst[['fw_speed', 'fr_speed','epoch_score','fw_score','fr_score','score']], \
how='inner', left_index=True, right_index=True)
print('Finished wrangling data.')
def eval_reliability(self):
df_nonreliability = self.df_files.query('md5_write!=md5_read')
if len(df_nonreliability) > 0:
print('!!!!!!! Fail to pass Reliablity Test !!!!!!!')
print('------The following files have different digests of writing and reading:-----')
print(self.df_files)
else:
print('### Success to pass Reliablity Test ###')
def analysize_correlationship(self):
tmpdf = self.df_epochs[['workers', 'record_length', 'record_number', 'files', \
'total_files', 'file_size', 'e_speed']]
self.__analysize_correlationship(tmpdf, 'e_speed', 'EPOCHS PERFORMACE')
tmpdf = self.df_files[['workers', 'record_length', 'record_number', \
'file_size', 'files','total_files', 'fw_speed', 'fr_speed']]
self.__analysize_correlationship(tmpdf, 'fw_speed', 'WORKER PERFORMANCE')
self.__analysize_correlationship(tmpdf, 'fr_speed', 'WORKER PERFORMANCE')
def __analysize_correlationship(self, df, basic_col, df_label):
sr_corr = df.corr()[basic_col]
sr_corr.dropna(inplace=True)
sr_corr.drop(labels=[basic_col], inplace=True)
order = sr_corr.abs().sort_values(ascending = False)
print('### Analysizing result of %s based on "%s" ###' % (df_label, basic_col) )
print(sr_corr[order.index])
#print('--- the most factor influencing "%s" is "%s"'%(basic_col,order.index[0]))